// Copyright 2024 TiKV Project Authors. Licensed under Apache-2.0.

pub use engine_traits::SstCompressionType;
use external_storage::{locking::RemoteLock, ExternalStorage};
use futures::{io::AsyncReadExt, stream::TryStreamExt};
use tikv_util::warn;

use crate::{
    errors::Result,
    execute::hooking::{AbortedCtx, AfterFinishCtx, BeforeStartCtx, ExecHooks},
    storage::LOCK_PREFIX,
    util::storage_url,
    ErrorKind, TraceResultExt,
};

#[derive(Default)]
pub struct StorageConsistencyGuard {
    lock: Option<RemoteLock>,
}

async fn load_storage_checkpoint(storage: &dyn ExternalStorage) -> Result<Option<u64>> {
    let path = "v1/global_checkpoint/";
    storage
        .iter_prefix(path)
        .err_into()
        .try_fold(None, |i, v| async move {
            if !v.key.ends_with(".ts") {
                return Ok(i);
            }
            let mut ts = vec![];
            storage.read(&v.key).read_to_end(&mut ts).await?;
            let ts_bytes = <[u8; 8]>::try_from(ts);
            let ts = match ts_bytes {
                Ok(bytes) => u64::from_le_bytes(bytes),
                Err(_) => {
                    warn!("Cannot parse ts from file."; "file" => %v.key);
                    return Ok(i);
                }
            };
            let res = match i {
                None => Some(ts),
                Some(ts0) => Some(ts.min(ts0)),
            };
            Ok(res)
        })
        .await
}

impl ExecHooks for StorageConsistencyGuard {
    async fn before_execution_started(&mut self, cx: BeforeStartCtx<'_>) -> Result<()> {
        use external_storage::locking::LockExt;

        let cp = load_storage_checkpoint(cx.storage)
            .await
            .annotate("failed to load storage checkpoint")?;
        match cp {
            Some(cp) => {
                if cx.this.cfg.until_ts > cp {
                    let err_msg = format!(
                        "The `--until`({}) is greater than the checkpoint({}). We cannot compact unstable content for now.",
                        cx.this.cfg.until_ts, cp
                    );

                    // We use `?` instead of return here to keep the stack frame in the error.
                    // Or if we use `.into()` the frame attached will be the default implementation
                    // of `Into`...
                    Err(ErrorKind::Other(err_msg))?;
                }
            }
            None => {
                let url = storage_url(cx.storage);
                warn!("No checkpoint loaded, maybe wrong storage used?"; "url" => %url);
                Err(ErrorKind::Other(format!(
                    "Cannot load checkpoint from {}",
                    url
                )))?;
            }
        }

        let hint = format!(
            "This is generated by the compaction {}.",
            cx.this.gen_name()
        );
        self.lock = Some(cx.storage.lock_for_read(LOCK_PREFIX, hint).await?);

        Ok(())
    }

    async fn after_execution_finished(&mut self, cx: AfterFinishCtx<'_>) -> Result<()> {
        if let Some(lock) = self.lock.take() {
            lock.unlock(cx.storage).await?;
        }
        Ok(())
    }

    async fn on_aborted(&mut self, cx: AbortedCtx<'_>) {
        if let Some(lock) = self.lock.take() {
            warn!("It seems compaction failed. Resolving the lock."; "err" => %cx.err);
            if let Err(err) = lock.unlock(cx.storage).await {
                warn!("Failed to unlock when failed, you may resolve the lock manually"; "err" => %err, "lock" => ?lock);
            }
        }
    }
}
